package com.chenfan.process.config.thread;

import com.chenfan.ccp.executor.SmartExecutorService;
import com.chenfan.ccp.executor.TraceContext;
import com.chenfan.ccp.executor.TraceContextHolder;
import com.chenfan.infra.context.TenantContextHolder;
import com.chenfan.infra.context.UserContextHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskDecorator;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.*;

@Configuration
@EnableAsync
@Slf4j
public class ThreadPoolTaskConfig {

    @Bean("threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {

        int i = Runtime.getRuntime().availableProcessors();

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        //线程池创建的核心线程数，线程池维护线程的最少数量，即使没有任务需要执行，也会一直存活
        executor.setCorePoolSize(i*2);

        //如果设置allowCoreThreadTimeout=true（默认false）时，核心线程会超时关闭
        //executor.setAllowCoreThreadTimeOut(true);

        //阻塞队列 当核心线程数达到最大时，新任务会放在队列中排队等待执行
        executor.setQueueCapacity(i*2*10);

        //最大线程池数量，当线程数>=corePoolSize，且任务队列已满时。线程池会创建新线程来处理任务
        //任务队列已满时, 且当线程数=maxPoolSize，，线程池会拒绝处理任务而抛出异常
        executor.setMaxPoolSize(i*2);

        //当线程空闲时间达到keepAliveTime时，线程会退出，直到线程数量=corePoolSize
        //允许线程空闲时间30秒，当maxPoolSize的线程在空闲时间到达的时候销毁
        //如果allowCoreThreadTimeout=true，则会直到线程数量=0
        executor.setKeepAliveSeconds(60);

        //spring 提供的 ThreadPoolTaskExecutor 线程池，是有setThreadNamePrefix() 方法的。
        //jdk 提供的ThreadPoolExecutor 线程池是没有 setThreadNamePrefix() 方法的
        executor.setThreadNamePrefix("自定义线程池-");

        // rejection-policy：拒绝策略：当线程数已经达到maxSize的时候，如何处理新任务
        // CallerRunsPolicy()：交由调用方线程运行，比如 main 线程；如果添加到线程池失败，那么主线程会自己去执行该任务，不会等待线程池中的线程去执行
        // AbortPolicy()：该策略是线程池的默认策略，如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
        // DiscardPolicy()：如果线程池队列满了，会直接丢掉这个任务并且不会有任何异常
        // DiscardOldestPolicy()：丢弃队列中最老的任务，队列满了，会将最早进入队列的任务删掉腾出空间，再尝试加入队列
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        executor.setTaskDecorator(new AsyncTaskDecorator());

        executor.initialize();
        return executor;
    }
    @Bean
    public ThreadPoolExecutor threadPoolExecutor() {
        //获取cpu核心数
        int i = Runtime.getRuntime().availableProcessors();
        //核心线程数
        int corePoolSize = i * 2;
        //最大线程数
        int maximumPoolSize = i * 2;
        //线程无引用存活时间
        long keepAliveTime = 60;
        //时间单位
        TimeUnit unit = TimeUnit.SECONDS;
        //任务队列，接收一个整型的参数，这个整型参数指的是队列的长度，
        //ArrayBlockingQueue(int,boolean)，boolean类型的参数是作为可重入锁的参数进行初始化，默认false，另外初始化了notEmpty、notFull两个信号量。
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue(i * 2 * 10);
        //1. 同步阻塞队列 (put,take)，直接提交。直接提交策略表示线程池不对任务进行缓存。新进任务直接提交给线程池，当线程池中没有空闲线程时，创建一个新的线程处理此任务。
        // 这种策略需要线程池具有无限增长的可能性。实现为：SynchronousQueue
        //2. 有界队列。当线程池中线程达到corePoolSize时，新进任务被放在队列里排队等待处理。有界队列（如ArrayBlockingQueue）有助于防止资源耗尽，
        // 但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷：使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销，
        // 但是可能导致人工降低吞吐量。如果任务频繁阻塞（例如，如果它们是 I/O 边界），则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小，
        // CPU 使用率较高，但是可能遇到不可接受的调度开销，这样也会降低吞吐量。
        //3. 无界队列。使用无界队列（例如，不具有预定义容量的 LinkedBlockingQueue）将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。
        // 这样，创建的线程就不会超过 corePoolSize。（因此，maximumPoolSize 的值也就无效了。）当每个任务完全独立于其他任务，即任务执行互不影响时，
        // 适合于使用无界队列；例如，在 Web 页服务器中。这种排队可用于处理瞬态突发请求，当命令以超过队列所能处理的平均数连续到达时，此策略允许无界线程具有增长的可能性。

        //线程工厂
        //defaultThreadFactory()
        //返回用于创建新线程的默认线程工厂。
        //privilegedThreadFactory()
        //返回一个用于创建与当前线程具有相同权限的新线程的线程工厂。
        ThreadFactory threadFactory =Executors.defaultThreadFactory();
        //拒绝执行处理器
        RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
        //创建线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        return threadPoolExecutor;
    }
    @Bean(name = "executorService")
    @ConditionalOnMissingBean(ExecutorService.class)
    public ExecutorService executorService() {
        return SmartExecutorService.newFixedThreadPool();
    }

    class AsyncTaskDecorator implements TaskDecorator {

        @Override
        public Runnable decorate(Runnable runnable) {
            TraceContext traceContext = TraceContextHolder.getCurrentContext();
            Long tenantId = TenantContextHolder.getTenantId();
            return () -> {
                try {
                    log.info("=== TaskDecorator tenantId: {} ===", tenantId);
                    TraceContextHolder.setContext(traceContext);
                    TenantContextHolder.setTenantId(tenantId);
                    runnable.run();
                } catch (Exception e) {
                    log.error("AsyncTaskDecorator.decorate error:{}", e);
                } finally {
                    TraceContextHolder.clear();
                    TenantContextHolder.clear();
                }
            };
        }
    }

}